package cn.doitedu.api;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;

public class _06_CustomSourceFunction implements SourceFunction<String> {
    JSONObject jsonObject;
    String[] events;

    public _06_CustomSourceFunction(){
        jsonObject = new JSONObject();
        events = new String[]{"add_cart","item_share","ad_show","ad_click","page_load","video_play"};
    }



    @Override
    public void run(SourceContext<String> ctx) throws Exception {

        // 持续不断，产生随机的用户行为记录json数据
        //{"uid":1,"timestamp":12359238345,"event_id":"add_cart","properties":{"product_id":1,"quantity":2,"price":10.8}}

        for (; ; ) {
            jsonObject.put("uid", RandomUtils.nextInt(1,10000));
            jsonObject.put("timestamp",System.currentTimeMillis());
            jsonObject.put("event_id",events[RandomUtils.nextInt(0,events.length)]);
            jsonObject.put("session_id",RandomStringUtils.randomAlphabetic(5));

            HashMap<String, Object> propertiesMap = new HashMap<>();
            propertiesMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2));

            jsonObject.put("properties",propertiesMap);

            ctx.collect(jsonObject.toJSONString());

            Thread.sleep(200);
        }


    }

    @Override
    public void cancel() {

    }
}
